package com.hzzftech.watchdog.busi.core.executor.utils;

import com.hzzftech.watchdog.busi.constants.BusiConstant;
import com.hzzftech.watchdog.busi.domain.KtDispatcher;
import com.hzzftech.watchdog.busi.domain.KtDispatcherFailedMsg;
import com.hzzftech.watchdog.busi.service.IKtDispatcherFailedMsgService;
import com.hzzftech.watchdog.busi.service.IKtDispatcherService;
import com.hzzftech.watchdog.common.core.domain.entity.SysUser;
import com.hzzftech.watchdog.common.utils.StringUtils;
import com.hzzftech.watchdog.common.utils.spring.SpringUtils;
import com.hzzftech.watchdog.system.service.ISysUserService;
import org.pentaho.di.cluster.SlaveServer;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.util.Utils;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobConfiguration;
import org.pentaho.di.job.JobExecutionConfiguration;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.www.RegisterJobServlet;
import org.pentaho.di.www.StartJobServlet;
import org.pentaho.di.www.WebResult;
import org.pentaho.metastore.api.IMetaStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URLEncoder;
import java.util.Date;

/**
 * JOB任务执行器工具类
 */
public class JobExecuteUtil {

    public static Logger logger = LoggerFactory.getLogger(JobExecuteUtil.class);

    public static String sendToSlaveServer(JobMeta meta, JobExecutionConfiguration configuration, IMetaStore metaStore) throws KettleException {
        String carteId;
        SlaveServer slaveServer = configuration.getRemoteServer();

        if (slaveServer == null) {
            throw new KettleException(BaseMessages.getString(Job.class, "Job.Log.NoSlaveServerSpecified"));
        }

        if (Utils.isEmpty(meta.getName())) {
            throw new KettleException(BaseMessages.getString(Job.class, "Job.Log.UniqueJobName"));
        }

        slaveServer.getLogChannel().setLogLevel(configuration.getLogLevel());

        try {
            // 执行参数的构造
            for (String var : Const.INTERNAL_TRANS_VARIABLES) {
                configuration.getVariables().put(var, meta.getVariable(var));
            }
            for (String var : Const.INTERNAL_JOB_VARIABLES) {
                configuration.getVariables().put(var, meta.getVariable(var));
            }

            String xml = new JobConfiguration(meta, configuration).getXML();
            if (logger.isDebugEnabled()) {
                logger.debug("kjb xml content: {}",xml);
            }
            // 发送JOB xml信息到carte服务器
            String regReply = slaveServer.sendXML(xml, RegisterJobServlet.CONTEXT_PATH+"/?xml=Y");
            WebResult regWebResult = WebResult.fromXMLString(regReply);
            if (!regWebResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) {
                throw new KettleException("There was an error posting the job on remote server: "+Const.CR+regWebResult.getMessage());
            }

            carteId = regWebResult.getId();

            // 查询carte 服务区执行JOB是否成功
            String startReply = slaveServer.execService(
                    StartJobServlet.CONTEXT_PATH
                            + "/?name="
                            + URLEncoder.encode(meta.getName(), "UTF-8")
                            + "&xml=Y&id="
                            + carteId);
            WebResult startWebResult = WebResult.fromXMLString(startReply);
            if (!startWebResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) {
                throw new KettleException("there was an error starting the job the remote server: "+Const.CR+startWebResult.getMessage());
            }
            return carteId;
        } catch (IOException e) {
            logger.error("发送Carte失败", e);
            throw new KettleException(e);
        } catch (Exception e) {
            logger.error("发送Carte失败", e);
            throw new KettleException(e);
        }
    }
}
